package com.atguigu.gmall.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.text.SimpleDateFormat;
import java.util.Properties;

/**
 * Author: Felix
 * Date: 2021/12/24
 * Desc: 日志数据的分流
 * 需要启动的进程
 *      zk、kafka、hdfs、logger.sh(Nginx + 日志采集服务)、BaseLogApp
 * 执行流程
 *      -运行模拟生成日志数据的jar包
 *      -将生成的日志数据 发送给Nginx
 *      -Nginx进行负载均衡，将请求转发给202、203、204三台日志采集服务
 *      -日志采集对日志进行处理
 *          >打印
 *          >落盘
 *          >发送到kafka主题 ods_base_log
 *      -BaseLogApp从ods_base_log主题中读取数据
 *
 */
public class BaseLogApp {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境的准备
        //1.1 设置流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);

        /*//TODO 2.检查点设置
        //2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //2.2 超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        //2.3 job取消之后  检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/ck"));
        //2.6 设置操作hadoop的用户
        //System.setProperty("HADOOP_USER_NAME","atguigu");
        */
        //TODO 3.从Kafka中读取数据
        //3.1 声明消费的主题以及消费者组
        String topic = "ods_base_log";
        String groupId = "base_log_app_group";
        //3.2 创建消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        //3.3 消费数据  封装为流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4.对流中的数据类型进行转换   jsonString-->jsonObj
        /*
        //匿名内部类
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
            new MapFunction<String, JSONObject>() {
                @Override
                public JSONObject map(String jsonStr) throws Exception {
                    return JSON.parseObject(jsonStr);
                }
            }
        );
        //Lambda表达式
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
            jsonStr -> JSON.parseObject(jsonStr)
        );
        */
        //方法的默认调用
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //jsonObjDS.print(">>>>>");

        //TODO 5.新老访客状态标记修复
        //5.1 按照mid对数据进行分组
        KeyedStream<JSONObject, String> keyedDS =
            jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));

        //5.2 使用Flink的状态编程 对新老访客标记进行修复
        SingleOutputStreamOperator<JSONObject> jsonObjWithIsNewDS = keyedDS.map(
            new RichMapFunction<JSONObject, JSONObject>() {
                //注意：状态不能在声明的时候进行赋值，需要在open方法被调用后，才能够通过getRuntimeContext()获取运行时上下文
                private ValueState<String> lastVisitorDateState;
                private SimpleDateFormat sdf;

                @Override
                public void open(Configuration parameters) throws Exception {
                    lastVisitorDateState
                        = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastVisitorDateState", String.class));
                    sdf = new SimpleDateFormat("yyyyMMdd");
                }

                @Override
                public JSONObject map(JSONObject jsonObj) throws Exception {
                    //获取新老访客标记
                    String isNew = jsonObj.getJSONObject("common").getString("is_new");
                    //如果访客标记是1，有可能出现不准确的情况，需要进行修复
                    if ("1".equals(isNew)) {
                        //从状态中获取上次访问日期
                        String lastVisitDate = lastVisitorDateState.value();
                        //获取当前访问日期
                        String curVisitDate = sdf.format(jsonObj.getLong("ts"));
                        //判断上次访问日期是否为空
                        if (lastVisitDate != null && lastVisitDate.length() > 0) {
                            //如果上次访问日期不为null，说明曾经访问过，需要对标记进行修复
                            if (!lastVisitDate.equals(curVisitDate)) {
                                isNew = "0";
                                jsonObj.getJSONObject("common").put("is_new", isNew);
                            }

                        } else {
                            //如果上次访问日期为null，说明曾经没有访问过，当前访问为第一次访问,将当前访问日期放到状态中保存
                            lastVisitorDateState.update(curVisitDate);
                        }
                    }

                    return jsonObj;
                }
            }
        );

        //jsonObjWithIsNewDS.print(">>>>>");

        //TODO 6.日志分流       启动日志--启动侧输出流   曝光日志--曝光侧输出流中  页面日志--主流
        //6.1 定义侧输出流标签
        OutputTag<String> startTag = new OutputTag<String>("startTag"){};
        OutputTag<String> displayTag = new OutputTag<String>("displayTag"){};

        //6.2 分流
        SingleOutputStreamOperator<String> pageDS = jsonObjWithIsNewDS.process(
            new ProcessFunction<JSONObject, String>() {
                @Override
                public void processElement(JSONObject jsonObj, Context ctx, Collector<String> out) throws Exception {
                    JSONObject startJsonObj = jsonObj.getJSONObject("start");
                    String jsonStr = jsonObj.toJSONString();
                    //判断是否为启动日志
                    if (startJsonObj != null && startJsonObj.size() > 0) {
                        //启动日志  放到启动侧输出流中
                        ctx.output(startTag, jsonStr);
                    } else {
                        //除了启动日志之外，其他的都属于页面日志   放到主流中
                        out.collect(jsonStr);

                        //如果在页面中有displays属性，说明该页面有曝光信息，将曝光数据放到曝光侧输出流
                        JSONArray displaysArr = jsonObj.getJSONArray("displays");
                        if (displaysArr != null && displaysArr.size() > 0) {
                            Long ts = jsonObj.getLong("ts");
                            String pageId = jsonObj.getJSONObject("page").getString("page_id");
                            //说明有曝光信息  ,对曝光数据进行遍历
                            for (int i = 0; i < displaysArr.size(); i++) {
                                JSONObject displayJsonObj = displaysArr.getJSONObject(i);
                                displayJsonObj.put("ts", ts);
                                displayJsonObj.put("page_id", pageId);
                                ctx.output(displayTag, displayJsonObj.toJSONString());
                            }
                        }
                    }
                }
            }
        );

        //6.3 获取侧输出流
        DataStream<String> startDS = pageDS.getSideOutput(startTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);

        pageDS.print(">>>>");
        startDS.print("####");
        displayDS.print("&&&&");


        //TODO 7.将不同流中的数据写到kafka的不同主题中
        startDS.addSink(MyKafkaUtil.getKafkaSink("dwd_start_log"));
        displayDS.addSink(MyKafkaUtil.getKafkaSink("dwd_display_log"));
        pageDS.addSink(MyKafkaUtil.getKafkaSink("dwd_page_log"));

        env.execute();
    }
}
